package org.jetlinks.pro.clickhouse.metadata;

import lombok.AllArgsConstructor;
import org.hswebframework.ezorm.core.ValueCodec;
import org.hswebframework.ezorm.core.meta.ObjectMetadata;
import org.hswebframework.ezorm.rdb.executor.SqlRequests;
import org.hswebframework.ezorm.rdb.executor.reactive.ReactiveSqlExecutor;
import org.hswebframework.ezorm.rdb.executor.wrapper.ColumnWrapperContext;
import org.hswebframework.ezorm.rdb.executor.wrapper.ResultWrapper;
import org.hswebframework.ezorm.rdb.executor.wrapper.ResultWrappers;
import org.hswebframework.ezorm.rdb.metadata.RDBColumnMetadata;
import org.hswebframework.ezorm.rdb.metadata.RDBSchemaMetadata;
import org.hswebframework.ezorm.rdb.metadata.RDBTableMetadata;
import org.hswebframework.ezorm.rdb.metadata.ValueCodecFactory;
import org.hswebframework.ezorm.rdb.metadata.parser.TableMetadataParser;
import org.jetlinks.pro.clickhouse.ClickHouseDataType;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.List;
import java.util.Optional;

@AllArgsConstructor
public class ClickHouseTableMetadataParser implements TableMetadataParser {

    private final RDBSchemaMetadata schema;

    public ReactiveSqlExecutor getSqlExecutor() {
        return schema.findFeatureNow(ReactiveSqlExecutor.ID);
    }

    @Override
    public List<String> parseAllTableName() {
        throw new UnsupportedOperationException();
    }

    @Override
    public Flux<String> parseAllTableNameReactive() {
        return this
            .getSqlExecutor()
            .select("show tables", ResultWrappers.map())
            .map(map -> String.valueOf(map.get("name")));
    }

    @Override
    public boolean tableExists(String name) {
        throw new UnsupportedOperationException();
    }

    @Override
    public Mono<Boolean> tableExistsReactive(String name) {
        return this
            .parseAllTableNameReactive()
            .any(name::equalsIgnoreCase);
    }

    @Override
    public Optional<? extends ObjectMetadata> parseByName(String name) {
        throw new UnsupportedOperationException();
    }

    @Override
    public List<? extends ObjectMetadata> parseAll() {
        throw new UnsupportedOperationException();
    }

    @Override
    public Mono<? extends ObjectMetadata> parseByNameReactive(String name) {
        RDBTableMetadata tableMetadata = schema.newTable(name);
        tableMetadata.setName(name);
        return this
            .getSqlExecutor()
            .select(SqlRequests
                        .of(
                            "SELECT name,comment,type,is_in_primary_key FROM system.columns where table = ? and database = ?",
                            name, schema.getName()
                        ),
                    new ClickHouseColumnResultWrapper(tableMetadata))
            .doOnNext(tableMetadata::addColumn)
            .then(Mono.just(tableMetadata))
            .filter(table -> table.getColumns().size() > 0);
    }

    @Override
    public Flux<? extends ObjectMetadata> parseAllReactive() {
        return this
            .parseAllTableNameReactive()
            .flatMap(this::parseByNameReactive);
    }

    @AllArgsConstructor
    static class ClickHouseColumnResultWrapper implements ResultWrapper<RDBColumnMetadata, Void> {
        private final RDBTableMetadata table;

        @Override
        public RDBColumnMetadata newRowInstance() {
            return table.newColumn();
        }

        @Override
        public void wrapColumn(ColumnWrapperContext<RDBColumnMetadata> context) {
            String column = context.getColumnLabel();
            String value = String.valueOf(context.getResult());

            if ("name".equalsIgnoreCase(column)) {
                context.getRowInstance().setName(value);
            } else if ("type".equalsIgnoreCase(column)) {
                ClickHouseDataType dataType = ClickHouseDataType
                    .lookup(value)
                    .orElseThrow(() -> new UnsupportedOperationException("unsupported column[" + context
                        .getRowInstance()
                        .getName() + "] data type:" + value));
                context.getRowInstance().setType(dataType);
            } else if ("is_in_primary_key".equalsIgnoreCase(column)) {
                context.getRowInstance()
                       .setPrimaryKey("1".equals(value));
            } else if ("comment".equalsIgnoreCase(column)) {
                context.getRowInstance()
                       .setComment(value);
            }
        }

        @Override
        public boolean completedWrapRow(RDBColumnMetadata column) {
            column.findFeature(ValueCodecFactory.ID)
                  .flatMap(factory -> factory.createValueCodec(column))
                  .ifPresent(column::setValueCodec);
            return true;
        }

        @Override
        public Void getResult() {
            return null;
        }
    }
}
